import importlib
import shutil
import os
import sentry_sdk
from scarabaeus.enum import Status, ErrorCode, Task
from scarabaeus.interface.runable import RunAble
from scarabaeus.lib.error.error_messenger import ErrorMessenger
from scarabaeus.config import Config
from scarabaeus_task_runner.datas.dependency import Dependency
from scarabaeus_task_runner.frameworks.task_runner_facade import TaskRunnerFacade
from scarabaeus_task_runner.utils.auto_discover import load_module
from scarabaeus_task_runner.utils.query import fetch_task_info_by_id, update_common_task, log_common_task_start
class Main(RunAble):
_dependency: Dependency
_task_info: dict
_task_runner: TaskRunnerFacade
_origin_cwd: str
_runtime_path: str
def run(self):
self._initialize_runner()
self._fetch_task_info()
self._load_tool_runner()
self._setup_environment()
self._run_task()
self._clean_environment()
def _initialize_runner(self):
self._dependency = Dependency()
def _fetch_task_info(self):
task_id = os.getenv('TASK_ID')
self._dependency.logger.info('Fetch task info by id: {}'.format(task_id))
task_info = fetch_task_info_by_id(task_id)
self._dependency.logger.info('Fetch task info: {}'.format(task_info))
self._task_info = task_info
def _load_tool_runner(self):
module_name = self._task_info['module_name']
tool_class = load_module(module_name)
self._dependency.logger.info('Load tool class: {}'.format(tool_class))
self._task_runner = tool_class(self._dependency, self._task_info)
def _setup_environment(self):
task_id = str(self._task_info['id'])
base_path = self._dependency.config.BASE_PATH
runtime_path = os.path.join(base_path, task_id)
self._dependency.logger.info(f'Runtime path: {runtime_path}')
os.makedirs(runtime_path, exist_ok=True)
self._origin_cwd = os.getcwd()
os.chdir(runtime_path)
self._runtime_path = runtime_path
def _run_task(self):
log_common_task_start(self._task_info['id'])
try:
self._task_runner()
output_s3_key = self._task_runner.output_s3_key
update_common_task(self._task_info['id'], Status.END, output_s3_key)
except ValueError as e:
self._dependency.logger.error(e)
update_common_task(self._task_info['id'], Status.ERROR)
ErrorMessenger.record(
ErrorCode.SCAR_BAD_REQUEST.CODE,
ErrorCode.SCAR_BAD_REQUEST.SUB_CODE,
str(e),
Task.AUTO_PACK,
self._task_info['id']
)
raise e
except Exception as e:
update_common_task(self._task_info['id'], Status.ERROR)
raise e
def _clean_environment(self):
self._dependency.logger.info('Clean environment')
os.chdir(self._origin_cwd)
shutil.rmtree(self._runtime_path)
if __name__ == '__main__':
sentry_sdk.init(Config.SENTRY_ENDPOINT,
traces_sample_rate=1.0, environment=Config.ENV)
main_process = Main()
main_process()